Fix: Prevent duplicate edge_job insertions for deferrable tasks in EdgeExecutor (#53610)#53927
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
jscheffl
left a comment
There was a problem hiding this comment.
VEry good! Thanks for the fix! I can confirm this resolved the problem!
To be merged, can you please also extend the existing use cases in providers/edge3/tests/unit/edge3/executors/test_edge_executor.py to prevent a future regression?
|
@jscheffl Added test coverage under test_edge_executor.py to cover the updated behavior and prevent future regression. |
Thanks! |
|
@jscheffl Thanks for the feedback! I’ve updated the code to conditionally call validate_airflow_tasks_run_command using a hasattr check. This ensures compatibility with Airflow 2.10 and 2.11, where the method may not be present. |
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
jscheffl
left a comment
There was a problem hiding this comment.
"almost" good to merge but some nit about log formatting/commented code. Then I think it is good to merge.
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
jscheffl
left a comment
There was a problem hiding this comment.
Thanks for the multiple iterations for the fix and staying tuned until now! Finally all is green and looking good! LGTM! Thanks!
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
…geExecutor (apache#53610) (apache#53927) * Fix: Handle duplicate entries in EdgeExecutor for deferrable tasks * Add test cases to validate EdgeExecutor job update and worker sync behavior * Fix: Ensure execute_async handles command validation safely across Airflow versions * Fix test to use valid Airflow task command format * Clean up: Remove unused debug logs from EdgeExecutor --------- Co-authored-by: lakshminarayana.kumbha <lakshminarayana.kumbha@zemosolabs.com>
…geExecutor (apache#53610) (apache#53927) * Fix: Handle duplicate entries in EdgeExecutor for deferrable tasks * Add test cases to validate EdgeExecutor job update and worker sync behavior * Fix: Ensure execute_async handles command validation safely across Airflow versions * Fix test to use valid Airflow task command format * Clean up: Remove unused debug logs from EdgeExecutor --------- Co-authored-by: lakshminarayana.kumbha <lakshminarayana.kumbha@zemosolabs.com>
Overview
This PR fixes a crash in the Apache Airflow
EdgeExecutorcaused by duplicate entries in theedge_jobtable when handling deferrable tasks likeExternalTaskSensor(mode="reschedule").Root Cause
When a deferrable task times out and retries, both
execute_async()andqueue_workload()attempt to insert the same job into theedge_jobtable again, violating uniqueness constraints and causing the scheduler to crash.Solution
Instead of blindly inserting, both methods now:
dag_id,task_id,run_id,map_index,try_number)This preserves job queue integrity and ensures tasks are picked up correctly by the edge worker.
Changes Made
execute_async()andqueue_workload()in:airflow/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.pyRelated Issue
Closes #53610